/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.accumulo.core.iterators.user;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;

import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.OptionDescriber;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.hadoop.io.Text;

/**
 * The RowEncodingIterator is designed to provide row-isolation so that queries see mutations as
 * atomic. It does so by encapsulating an entire row of key/value pairs into a single key/value
 * pair, which is returned through the client as an atomic operation. This is an abstract class,
 * allowing the user to implement rowEncoder and rowDecoder such that the columns and values of a
 * given row may be encoded in a format best suited to the client.
 *
 * <p>
 * For an example implementation, see {@link WholeRowIterator}.
 *
 * <p>
 * One caveat is that when seeking in the WholeRowIterator using a range that starts at a
 * non-inclusive first key in a row, (e.g. seek(new Range(new Key(new Text("row")),false,...),...))
 * this iterator will skip to the next row. This is done in order to prevent repeated scanning of
 * the same row when system automatically creates ranges of that form, which happens in the case of
 * the client calling continueScan, or in the case of the tablet server continuing a scan after
 * swapping out sources.
 *
 * <p>
 * To regain the original key/value pairs of the row, call the rowDecoder function on the key/value
 * pair that this iterator returned.
 *
 * @see RowFilter
 */
public abstract class RowEncodingIterator
    implements SortedKeyValueIterator<Key,Value>, OptionDescriber {

  public static final String MAX_BUFFER_SIZE_OPT = "maxBufferSize";
  private static final long DEFAULT_MAX_BUFFER_SIZE = Long.MAX_VALUE;

  protected SortedKeyValueIterator<Key,Value> sourceIter;
  private Key topKey = null;
  private Value topValue = null;
  private long maxBufferSize = DEFAULT_MAX_BUFFER_SIZE;

  // decode a bunch of key value pairs that have been encoded into a single value
  /**
   * Given a value generated by the rowEncoder implementation, recreate the original Key, Value
   * pairs.
   */
  public abstract SortedMap<Key,Value> rowDecoder(Key rowKey, Value rowValue) throws IOException;

  /**
   * Take a stream of keys and values. Return values in the same order encoded such that all
   * portions of the key (except for the row value) and the original value are encoded in some way.
   */
  public abstract Value rowEncoder(List<Key> keys, List<Value> values) throws IOException;

  @Override
  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
    RowEncodingIterator newInstance;
    try {
      newInstance = this.getClass().getDeclaredConstructor().newInstance();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    newInstance.sourceIter = sourceIter.deepCopy(env);
    newInstance.maxBufferSize = maxBufferSize;
    return newInstance;
  }

  List<Key> keys = new ArrayList<>();
  List<Value> values = new ArrayList<>();

  private void prepKeys() throws IOException {
    long kvBufSize = 0;
    if (topKey != null)
      return;
    Text currentRow;
    do {
      if (!sourceIter.hasTop())
        return;
      currentRow = new Text(sourceIter.getTopKey().getRow());
      keys.clear();
      values.clear();
      while (sourceIter.hasTop() && sourceIter.getTopKey().getRow().equals(currentRow)) {
        Key sourceTopKey = sourceIter.getTopKey();
        Value sourceTopValue = sourceIter.getTopValue();
        keys.add(new Key(sourceTopKey));
        values.add(new Value(sourceTopValue));
        kvBufSize += sourceTopKey.getSize() + sourceTopValue.getSize() + 128;
        if (kvBufSize > maxBufferSize) {
          throw new IllegalArgumentException(
              "Exceeded buffer size of " + maxBufferSize + " for row: " + sourceTopKey.getRow());
        }
        sourceIter.next();
      }
    } while (!filter(currentRow, keys, values));

    topKey = new Key(currentRow);
    topValue = rowEncoder(keys, values);
  }

  /**
   *
   * @param currentRow
   *          All keys have this in their row portion (do not modify!).
   * @param keys
   *          One key for each key in the row, ordered as they are given by the source iterator (do
   *          not modify!).
   * @param values
   *          One value for each key in keys, ordered to correspond to the ordering in keys (do not
   *          modify!).
   * @return true if we want to keep the row, false if we want to skip it
   */
  protected boolean filter(Text currentRow, List<Key> keys, List<Value> values) {
    return true;
  }

  @Override
  public Key getTopKey() {
    return topKey;
  }

  @Override
  public Value getTopValue() {
    return topValue;
  }

  @Override
  public boolean hasTop() {
    return topKey != null;
  }

  @Override
  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
      IteratorEnvironment env) throws IOException {
    sourceIter = source;
    if (options.containsKey(MAX_BUFFER_SIZE_OPT)) {
      maxBufferSize =
          ConfigurationTypeHelper.getFixedMemoryAsBytes(options.get(MAX_BUFFER_SIZE_OPT));
    }
  }

  @Override
  public IteratorOptions describeOptions() {
    String desc = "This iterator encapsulates an entire row of Key/Value pairs"
        + " into a single Key/Value pair.";
    String bufferDesc = "Maximum buffer size (in accumulo memory spec) to use"
        + " for buffering keys before throwing a BufferOverflowException.";
    HashMap<String,String> namedOptions = new HashMap<>();
    namedOptions.put(MAX_BUFFER_SIZE_OPT, bufferDesc);
    return new IteratorOptions(getClass().getSimpleName(), desc, namedOptions, null);
  }

  @Override
  public boolean validateOptions(Map<String,String> options) {
    String maxBufferSizeStr = options.get(MAX_BUFFER_SIZE_OPT);
    try {
      ConfigurationTypeHelper.getFixedMemoryAsBytes(maxBufferSizeStr);
    } catch (Exception e) {
      throw new IllegalArgumentException(
          "Failed to parse opt " + MAX_BUFFER_SIZE_OPT + " " + maxBufferSizeStr, e);
    }
    return true;
  }

  @Override
  public void next() throws IOException {
    topKey = null;
    topValue = null;
    prepKeys();
  }

  @Override
  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
      throws IOException {
    topKey = null;
    topValue = null;

    Key sk = range.getStartKey();

    if (sk != null && sk.getColumnFamilyData().length() == 0
        && sk.getColumnQualifierData().length() == 0 && sk.getColumnVisibilityData().length() == 0
        && sk.getTimestamp() == Long.MAX_VALUE && !range.isStartKeyInclusive()) {
      // assuming that we are seeking using a key previously returned by this iterator
      // therefore go to the next row
      Key followingRowKey = sk.followingKey(PartialKey.ROW);
      if (range.getEndKey() != null && followingRowKey.compareTo(range.getEndKey()) > 0)
        return;

      range = new Range(sk.followingKey(PartialKey.ROW), true, range.getEndKey(),
          range.isEndKeyInclusive());
    }

    sourceIter.seek(range, columnFamilies, inclusive);
    prepKeys();
  }

}
